feat: enhance event streaming with Last-Event-ID support#22
Merged
Conversation
smxzk32145745
commented
Jun 4, 2026
Contributor
- Implemented Last-Event-ID header and last_event_id query parameter for clients to receive missed events upon reconnection.
- Updated EventBus to support durable event replay using Redis Streams.
- Enhanced SSE event handling in both backend and frontend to include event IDs.
- Added tests for event replay functionality to ensure reliability and correctness.
- Implemented Last-Event-ID header and last_event_id query parameter for clients to receive missed events upon reconnection. - Updated EventBus to support durable event replay using Redis Streams. - Enhanced SSE event handling in both backend and frontend to include event IDs. - Added tests for event replay functionality to ensure reliability and correctness.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR enhances the run SSE event stream to support durable replay on reconnect via the Last-Event-ID mechanism (and a last_event_id query param), spanning frontend consumption, Python backend event streaming, and Java API SSE bridging.
Changes:
- Add event IDs to SSE frames and propagate them through the frontend connection + hooks.
- Introduce replay-capable event buses (in-memory log + Redis Streams) and use
Last-Event-ID/last_event_idto replay missed events before resuming live delivery. - Add backend tests covering replay behavior and event ID presence.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| frontend/lib/useRunEventSource.ts | Stores received SSE eventId alongside events in React state. |
| frontend/lib/run-event-connection.ts | Tracks lastEventId from SSE frames and reconnects using last_event_id query param; passes event IDs to handlers. |
| backend/tests/test_event_replay.py | Adds tests for replay via header/query param and verifies event IDs are emitted; includes Redis replay test with fakeredis. |
| backend/app/events/bus.py | Extends EventBus to return event IDs, adds replay API, and implements Redis Streams-backed durable replay. |
| backend/app/core/config.py | Adds Redis stream configuration settings (prefix/suffix/max len). |
| backend/app/api/v1/events.py | Implements SSE replay + catch-up logic using Last-Event-ID / last_event_id and emits SSE id fields. |
| backend-java/src/main/resources/application.yml | Adds stream replay configuration for the Java service. |
| backend-java/src/main/java/io/agentflow/api/service/EventStreamService.java | Adds Redis Stream replay + Last-Event-ID support and envelope parsing for pub/sub messages. |
| backend-java/src/main/java/io/agentflow/api/controller/EventsController.java | Accepts Last-Event-ID header and last_event_id query param and forwards to service. |
| backend-java/src/main/java/io/agentflow/api/config/AgentflowProperties.java | Adds stream replay properties (suffix/max len) under agentflow.events. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+31
to
+37
| def _is_after(entry_id: str, after_id: str | None) -> bool: | ||
| if after_id is None: | ||
| return True | ||
| try: | ||
| return int(entry_id) > int(after_id) | ||
| except ValueError: | ||
| return entry_id > after_id |
Comment on lines
+117
to
+121
| try { | ||
| replay(emitter, streamKey, lastSentId.get(), lastSentId); | ||
| } catch (Exception e) { | ||
| log.warn("Failed catch-up replay for run {}", runId, e); | ||
| } |
Comment on lines
+212
to
+217
| private static boolean isAfter(String candidate, String lastId) { | ||
| try { | ||
| return Long.parseLong(candidate) > Long.parseLong(lastId); | ||
| } catch (NumberFormatException ex) { | ||
| return candidate.compareTo(lastId) > 0; | ||
| } |
Comment on lines
+87
to
+90
| if event_id: | ||
| last_id = event_id | ||
|
|
||
| yield _sse_frame(event_id or "0", event) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.